from ma.commons.core.reducetaskrunner import ReduceTaskRunner
from ma.commons.core.abstractreducer import AbstractReducer
from ma.commons.core.processrunner import ProcessRunner
from .partitioner import Partitioner
import ma.commons.core.constants as constants
import ma.const
import ma.log

import sys
import pickle
import time
import os
import traceback



class MRReduceTaskRunner(ReduceTaskRunner):
    
    def __init__(self, tip_info, user_module_name, user_class_name):
        
        ReduceTaskRunner.__init__(self, tip_info, user_module_name, user_class_name)
        
        self.__log = ma.log.get_logger("ma.codes")
        
    
    def startTask(self):
        """This func is called to start the user defined code as a separate 
        process
        """
        partitioner = Partitioner(self.tip_info.job_id)
        hash_value = partitioner.taskIdToHash(self.tip_info.task_id)
        
        input_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, self.tip_info.job_id) + os.sep
    
        list_of_dict = []
        list_of_keys = []
        
        # to note the keys that are thresholded
        list_of_thresholded_keys = []
        
        for input_data in self.tip_info.input_data:
            if input_data[1] == constants.MAP:
                filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename_with_key, input_data[0], input_data[2], hash_value)
            else:
                self.__log.error("Error Reduce task %d has been given a reduce input: %s", self.tip_info.task_id)
            
            # filepath for the shuffled input
            filepath = input_dest_dir + filename
            
            try:
                self.__log.info('Input file to reduce %d: %s', self.tip_info.task_id, filepath)
                
                f = open(filepath, 'r')
                filecontents = f.read()
                f.close()
                
                self.__log.info('Input file to reduce %d read completely: %s', self.tip_info.task_id, filepath)
                
                # read dictionary and append to list
                d = eval(filecontents)
                list_of_dict.append(d)
                
                # add all keys to our key list
                for key in d:
                    if key not in list_of_keys:
                        list_of_keys.append(key)
                
            except IOError as err_msg:
                self.__log.error("Error while reading reduce input or running task: %s", err_msg)
                os._exit(-1)
        
        # get the thresholding value
        threshold_value = ma.const.JobsXmlData.get_float_data(ma.const.xml_threshold_value, self.tip_info.job_id)
        
        # get the time scaleup value for reduce computation
        reduce_time_scaleup = ma.const.JobsXmlData.get_float_data(ma.const.xml_reduce_secs_inp_scaleup, self.tip_info.job_id)
        
        if reduce_time_scaleup > 0:
            self.__log.warning('Reduce time scale-up: reduce ID %d Job %d: %f', self.tip_info.task_id, self.tip_info.job_id, reduce_time_scaleup)
            
        # iterate through all keys
        for key in list_of_keys:
            list = []
            
            # collate all values from all dictionaries
            for d in list_of_dict:
                if key in d:
                    list += d[key]
            
            user_vars = [None, None, threshold_value, reduce_time_scaleup]
            
            #self.__log.info('Input to reduce %d function for key: %s', self.tip_info.task_id, key)
            
            th = eval( 'self.user_class.' + self.user_class_name + '(key, list, self.key_value_dict, user_vars)' )
            self.threads.append(th)
            
            # check if the Reducer is thresholdable
            thresholdable = th.thresholdable
            
            th.start()
            th.join()
            
            # if there is a valid threshold value and the Reducer is Thresholdable
            if threshold_value != None and thresholdable == AbstractReducer.THRESHOLDABLE:
                # if the key has been added to the dictionary by the reduce worker
                if key in self.key_value_dict:
                    final_key_value = self.key_value_dict[key][0]
                    
                    # if the key's value has crossed the threshold
                    if final_key_value > threshold_value:
                        print(("------ Reduce ID %d Job ID %d -----> KEY THRESHOLDED - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                        list_of_thresholded_keys.append((key, time.time(), self.tip_info.task_id))
            
        self.__log.info('Actual processing on reduce %d ENDED', self.tip_info.task_id)
        
        # write the dictionary to the output file
        if not self.writeOutputToLocalFile():
            # some error occurred
            return [ False ]
        else:
            # ran fine
            return [ True, list_of_thresholded_keys ]
            

if __name__ == "__main__":
    """This function is called by the process creator to transfer information
    about the reduce task to run
    """
    
    try:
        print("Running MRReduceTaskRunner process pid", os.getpid())
        
        if len(sys.argv) == 5:
            user_module_name = sys.argv[1]
            user_class_name = sys.argv[2]
            job_id = sys.argv[3]
            task_id = sys.argv[4]
            #print user_module_name, user_class_name
            
            # get the filename which holds the reduce compute misc. output
            proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, job_id, task_id)
            
            # get the pickled list data
            list = ProcessRunner.pull_pickled_data(proc_temp_filename, job_id)
            reducetipinfo = list.__getitem__(0)
            #print 'Job ID:', maptipinfo.job_id
            #print 'Task ID:', maptipinfo.task_id
            #print 'Map or reduce:', maptipinfo.map_or_reduce
            #print 'Struct data:', maptipinfo.struct_data
            #print 'Input data:', maptipinfo.input_data
            
            task_runner = MRReduceTaskRunner(reducetipinfo, user_module_name, user_class_name)
            # open files, process, write output
            ret = task_runner.startTask()
            
            if not ret[0]:
                # some error occurred
                os._exit(-1)
            else:
                # pack and pickle the lists back to the ReduceTIP
                list_of_thresholded_keys = ret[1]
                
                # pickle the list and write to temp file
                list = [list_of_thresholded_keys]
                proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, reducetipinfo.job_id, reducetipinfo.task_id)
                ProcessRunner.store_pickled_data(list, proc_temp_filename, reducetipinfo.job_id)
                
                # ran fine
                os._exit(0)
        else:
            print('Wrong number of arguments for ReduceTaskRunner process')
            os._exit(-1)
                    
    except Exception as err_msg:
        traceback.print_exception()
        print("MRReduceTaskRunner:__main__ Exception in running reduce: %s" % err_msg)
        os._exit(-1)
